datascan/bulk-creation-scripts/dataquality /lib.py (70 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import yaml from yaml.loader import SafeLoader import re import random import string from datascan import getDatascan class LineNumberLoader(SafeLoader): def construct_mapping(self, node, deep=False): """ Method to add line number into config """ mapping = super().construct_mapping(node, deep) mapping['__line__'] = node.start_mark.line return mapping def removeLineKeys(config) -> list: """ Method to recursively remove '__line__' keys from config. return: config """ if isinstance(config, list): return [removeLineKeys(item) for item in config] elif isinstance(config, dict): return { key: removeLineKeys(value) for key, value in config.items() if key != '__line__' } else: return config def validateConfigFields(config) -> None: """ Method to validate the Config Fields return: None """ if isinstance(config, dict): for key, value in config.items(): if value is None: raise ValueError(f"Field '{key}' is None for the block at line {config.get('__line__')}") validateConfigFields(value) elif isinstance(config, list): for item in config: validateConfigFields(item) def validateConfigFile(config_path) -> list: """ Method to valide the Config File return: configs """ # load the config file with open(config_path, 'r') as f: config_file = list(yaml.load_all(f, Loader=LineNumberLoader)) # validate the config file for config in config_file: if not {'projectId', 'locationId', 'bqTable', 'dataQualitySpec'} <= config.keys(): raise ValueError( "Config file must define all the required config fields: " "'projectId', 'locationId', 'bqTable', 'dataQualitySpec') at line ",config.get('__line__') ) if not 'rules' in config['dataQualitySpec']: raise ValueError( "Config file must define at least 1 rule for the block at line ",config.get('__line__') ) # validate format for bqTable full_table_name = config['bqTable'] if not re.match(r'^[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+$', full_table_name): raise ValueError( f"bqTable - {full_table_name} does not match the expected format 'project_id.dataset_id.table_id'" "at line ", config.get('__line__') ) # validate nested fields validateConfigFields(config) configs = [removeLineKeys(config) for config in config_file] return configs def validateCLI(gcp_project_id, location_id, data_profile_ids) -> None: """ Method to valide the CLI Input return: None """ #check for all the CLI arguments if not gcp_project_id or not location_id or not data_profile_ids: raise ValueError( "CLI input must define configs using the parameters: " "('--gcp_project_id', '--location_id', '--data_profile_ids'). " ) for data_profile in data_profile_ids: if not re.match(r'^[a-zA-Z0-9_-]+\.[a-z][a-z0-9-]+[a-z0-9]\.[a-z][a-z0-9-]+[a-z0-9]$', data_profile): raise ValueError( f"data_profile_id - {data_profile} does not match the expected format 'project_id.location_id.datascan_id'" ) return None def generateDataScanId() -> str: """ Method to generate the random datascan Id return: Datascan ID """ # generate datascan id letters_and_digits = string.ascii_lowercase + string.digits random_string = ''.join(random.choice(letters_and_digits) for i in range(33)) datascan_id = f'dq-{random_string}' return datascan_id